/**
 * FileName: StreamingFilter
 * Author:   SAMSUNG-PC 孙中军
 * Date:     2019/3/18 15:09
 * Description: 实现对数据的过滤
 */
package cn.com.bonc.app;

import cn.com.bonc.conf.ConfigurationManager;
import cn.com.bonc.constant.Constants;
import cn.com.bonc.util.ColumnsUtil;
import org.apache.commons.lang.StringUtils;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;

public class StreamingFilter {

    public static void main(String[] args) {
        /**
         * SparkSession 是 Spark SQL 的入口。
         * 使用 Dataset 或者 Datafram 编写 Spark SQL 应用的时候，第一个要创建的对象就是 SparkSession。
         */
        SparkSession sparkSession = SparkSession
                .builder()
                .getOrCreate();
        /**
         * 从kafka读取数据，为流式查询创建Kafka源
         */
        Dataset<Row> kafkaDataset = sparkSession.readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", ConfigurationManager.getProperty(Constants.KAFKA_BOOTSTRAP_SERVERS))
                .option("subscribe", ConfigurationManager.getProperty(Constants.KAFKA_TOPICS))
                .option("startingOffsets", ConfigurationManager.getProperty(Constants.KAFKA_AUTO_OFFSET_RESET))
                .load();

        Dataset<String> stringDataset = kafkaDataset
                .selectExpr("CAST(value AS STRING)")
                .as(Encoders.STRING());
        /**
         * 使用filter方法进行数据过滤，只有数据元素大于5个数据才会被保留
         * 注意：lambda表达式需要强制类型转换
         */
        Dataset<String> filterResult = stringDataset.filter((FilterFunction<String>) line -> {
            String[] lines = StringUtils.splitPreserveAllTokens(line, "|");
            if (lines.length > 5) {
                return true;
            }
            return false;
        });

        /**
         * 将value列转换为多列，规则在column-mapping.properties中进行定义
         */
        Dataset<Row> multiColumnDataset = ColumnsUtil
                .getInstance()
                .getMultiColumnDataset(kafkaDataset, "value");
        kafkaDataset.printSchema();

        /**
         * 使用SQL进行数据的过滤
         */
        multiColumnDataset.createOrReplaceTempView("kafka");
        Dataset<Row> sql = sparkSession.sql("select * from kafka where phone !='null'");

        /**
         * 打印结果输出到控制台
         */
        StreamingQuery query = sql
                .writeStream()
                .format("console")
                .outputMode(OutputMode.Append())
                .option("truncate", "false")
                .start();

        try {
            query.awaitTermination();
        } catch (StreamingQueryException e) {
            e.printStackTrace();
        }
    }
}
